/**
 * @Author: liaoPengLiang
 * @File:  receiver
 * @Version: 1.0.0
 * @Date: 2023/3/24
 * @Description:
 */

package tunnelx

import (
	"fmt"
	utils "github.com/alibaba/MongoShake/v2/common"
	module "github.com/alibaba/MongoShake/v2/modules"
	"github.com/alibaba/MongoShake/v2/oplog"
	"github.com/alibaba/MongoShake/v2/tunnel"
	"go.mongodb.org/mongo-driver/bson"
)

type Op string

func (op Op) String() string {
	switch op {
	case OpI:
		return "新增"
	case OpU:
		return "修改"
	case OpD:
		return "删除"
	}
	return ""
}

const (
	OpI Op = "i"
	OpU Op = "u"
	OpD Op = "d"
)

const (
	PendingQueueCapacity = 256
)

type Replayer struct {
	Retransmit bool  // need re-transmit
	Ack        int64 // ack number

	// current compressor construct by TMessage
	// Compress field specific
	compressor module.Compress

	// pending queue, use to pass message
	pendingQueue chan *MessageWithCallback

	id int // current replayer id
}

type MessageWithCallback struct {
	message    *tunnel.TMessage
	completion func()
}

func NewReplayer(id int) *Replayer {
	r := &Replayer{
		pendingQueue: make(chan *MessageWithCallback, PendingQueueCapacity),
		id:           id,
	}
	go r.handler()
	return r
}

func (r *Replayer) GetAckEd() int64 {
	return r.Ack
}

//
// Sync
//  @Description: 同步处理mongoShake发生信息, 如下是实现流程
// 		1. 如果需要重新发就将当前信息废除
//		2. 校验和验证
//		3. 解压
//		4. 将信息放入通道中并进行业务处理
//  @param msg mongoShake发送信息 配置为
//  @param completion
//  @return int64
//
func (r *Replayer) Sync(msg *tunnel.TMessage, completion func()) int64 {
	// 告诉收集器我们需要重新传输所有未备份的操作日志优先
	// 这总是发生在接收机重启!
	if r.Retransmit {
		if msg.Tag&tunnel.MsgRetransmission == 0 {
			return tunnel.ReplyRetransmission
		}
		r.Retransmit = false
	}

	// 验证校验和值
	if !r.validate(msg) {
		return tunnel.ReplyChecksumInvalid
	}

	// 解压
	err, reply := r.decompress(msg)
	if err != nil {
		return reply
	}

	// 写入通道处理
	r.pendingQueue <- &MessageWithCallback{message: msg, completion: completion}

	return r.GetAckEd()
}

// 验证
func (r *Replayer) validate(msg *tunnel.TMessage) bool {
	if msg.Checksum != 0 {
		recalculated := msg.Crc32()
		if recalculated != msg.Checksum {
			r.Retransmit = true
			return false
		}
	}
	return true
}

// 解压
func (r *Replayer) decompress(msg *tunnel.TMessage) (err error, rep int64) {
	if msg.Compress == module.NoCompress {
		return nil, 0
	}
	if r.compressor, err = module.GetCompressorById(msg.Compress); err != nil {
		r.Retransmit = true
		return err, tunnel.ReplyCompressorNotSupported
	}

	var decompress [][]byte
	for _, toDecompress := range msg.RawLogs {
		bits, err := r.compressor.Decompress(toDecompress)
		if err == nil {
			decompress = append(decompress, bits)
		}
	}
	if len(decompress) != len(msg.RawLogs) {
		r.Retransmit = true
		return err, tunnel.ReplyDecompressInvalid
	}
	msg.RawLogs = decompress
	return nil, 0
}

//
// handler
//  @Description: 针对mongoShake同步信息处理
//  @receiver r
//
func (r *Replayer) handler() {
	for msg := range r.pendingQueue {
		count := uint64(len(msg.message.RawLogs))
		if count == 0 {
			continue
		}

		oplogs, err := r.parseMsg(msg)
		if err != nil {
			return
		}

		if callback := msg.completion; callback != nil {
			callback()
		}

		// get the newest timestamp
		n := len(oplogs)
		lastTs := utils.TimeStampToInt64(oplogs[n-1].Timestamp)
		r.Ack = lastTs

		// add logical code below
		for _, oplog := range oplogs {
			handler, ok := routes[oplog.Namespace]
			if ok {
				var err error
				op := Op(oplog.Operation)
				switch op {
				case OpI:
					err = handler.Insert(oplog)
				case OpU:
					err = handler.Update(oplog)
				case OpD:
					err = handler.Delete(oplog)
				}

				if err != nil {
					// 同步失败处理
					fmt.Printf("oplog sync err mongo db = %s, option = %s, query = %s, err = %s \n", oplog.Namespace, op.String(), oplog.Query, err.Error())
				}
			} else {
				// 查询不到处理对象
				fmt.Printf("oplog hander not found namespace %s", oplog.Namespace)
			}
		}
	}
}

func (r *Replayer) parseMsg(msg *MessageWithCallback) ([]*oplog.ParsedLog, error) {
	oplogs := make([]*oplog.ParsedLog, len(msg.message.RawLogs))
	for i, raw := range msg.message.RawLogs {
		oplogs[i] = new(oplog.ParsedLog)
		if err := bson.Unmarshal(raw, &oplogs[i]); err != nil {
			// impossible switch, need panic and exit
			return oplogs, err
		}
	}
	return oplogs, nil
}
